#!/usr/bin/env python
# -*- coding: utf-8 -*-
from __future__ import print_function
import argparse
import os
import numpy as np
import pickle
from keras import backend as K
from keras.callbacks import ModelCheckpoint
from keras.models import Model
from keras.layers import Input
from keras.layers import Conv2D, MaxPooling2D, UpSampling2D, Concatenate, Flatten, Dense, Dropout
from keras.layers import merge
from keras.optimizers import Adam, SGD, RMSprop
from keras.preprocessing.image import list_pictures, array_to_img
from image_ext import list_pictures_in_multidir, load_imgs_asarray
from create_fcn import create_fcn01, create_pupil_net00
np.random.seed(2016)
def load_fnames(paths):
f = open(paths)
data1 = f.read()
f.close()
lines = data1.split('\n')
#print(len(lines))
# 最終行は空行なので消す
del(lines[len(lines)-1])
#print(len(lines))
return lines
def make_fnames(fnames,fpath,fpath_mask,mask_ext):
fnames_img = [];
fnames_mask= [];
for i in range(len(fnames)):
fnames_img.append(fpath + '/' + fnames[i]);
fnames_mask.append(fpath_mask + '/' + mask_ext + fnames[i]);
return [fnames_img,fnames_mask]
def get_center(im):
im[im>0] = 1;
xval = 0
yval = 0
npix = 0
for x in range(0,im.shape[1]):
xval += (x*sum(im[:,x]))
npix += sum(im[:,x])
for y in range(0,im.shape[0]):
yval += (y*sum(im[y,:]))
return [xval/npix,yval/npix]
#
# MAIN STARTS FROM HERE
#
if __name__ == '__main__':
target_size = (224, 224)
dpath_this = './'
dname_checkpoints = 'checkpoints_pupil_net00'
dname_checkpoints_fcn01 = 'checkpoints_fcn01'
dname_outputs = 'outputs'
fname_architecture = 'architecture.json'
fname_weights = "model_weights_{epoch:02d}.h5"
fname_stats = 'stats01.npz'
dim_ordering = 'channels_first'
fname_history = "history.pkl"
# definision of mode, LEARN or TEST or SHOW_HISTORY
mode = "LEARN"
mode = "SHOW_HISTORY"
mode = "TEST"
# モデルを作成
print('creating model...')
model_pupil_net = create_pupil_net00(target_size)
#
# LEARNING MODE
#
mode = "LEARN"
if mode == "LEARN":
# Read Learning Data
fnames = load_fnames('data/list_train_01.txt')
[fpaths_xs_train,fpaths_ys_train] = make_fnames(fnames,'data/img','data/mask','OperatorA_')
X_train = load_imgs_asarray(fpaths_xs_train, grayscale=False, target_size=target_size,
dim_ordering=dim_ordering)
Y_train = load_imgs_asarray(fpaths_ys_train, grayscale=True, target_size=target_size,
dim_ordering=dim_ordering)
# Read Validation Data
fnames = load_fnames('data/list_valid_01.txt')
[fpaths_xs_valid,fpaths_ys_valid] = make_fnames(fnames,'data/img','data/mask','OperatorA_')
X_valid = load_imgs_asarray(fpaths_xs_valid, grayscale=False, target_size=target_size,
dim_ordering=dim_ordering)
Y_valid = load_imgs_asarray(fpaths_ys_valid, grayscale=True, target_size=target_size,
dim_ordering=dim_ordering)
# obtain center of pupil
center_train = []
center_valid = []
for i in range(Y_train.shape[0]):
center_train.append(get_center(Y_train[i,0,:,:]))
for i in range(Y_valid.shape[0]):
center_valid.append(get_center(Y_valid[i,0,:,:]))
center_train = np.array(center_train)
center_valid = np.array(center_valid)
print('==> ' + str(len(X_train)) + ' training images loaded')
print('==> ' + str(len(Y_train)) + ' training masks loaded')
print('==> ' + str(len(X_valid)) + ' validation images loaded')
print('==> ' + str(len(Y_valid)) + ' validation masks loaded')
# 前処理
print('computing mean and standard deviation...')
mean = np.mean(X_train, axis=(0, 2, 3))
std = np.std(X_train, axis=(0, 2, 3))
print('==> mean: ' + str(mean))
print('==> std : ' + str(std))
print('saving mean and standard deviation to ' + fname_stats + '...')
stats = {'mean': mean, 'std': std}
if os.path.exists(dname_checkpoints) == 0:
os.mkdir(dname_checkpoints)
np.savez(dname_checkpoints + '/' + fname_stats, **stats)
print('==> done')
print('globally normalizing data...')
for i in range(3):
X_train[:, i] = (X_train[:, i] - mean[i]) / std[i]
X_valid[:, i] = (X_valid[:, i] - mean[i]) / std[i]
Y_train /= 255
Y_valid /= 255
print('==> done')
# モデルに学習済のfcn02 Weightをロードする
epoch = 200
fname_weights = 'model_weights_%02d.h5'%(epoch)
model_fcn01 = create_fcn01(target_size)
fpath_weights_fcn01 = os.path.join(dname_checkpoints_fcn01, fname_weights)
model_fcn01.load_weights(fpath_weights_fcn01)
# load weights from Learned U-NET
layer_names = ['conv1_1','conv1_2','conv2_1','conv2_2']
print('copying layer weights')
for name in layer_names:
print(name)
model_pupil_net.get_layer(name).set_weights(model_fcn01.get_layer(name).get_weights())
model_pupil_net.get_layer(name).trainable = True
# 損失関数,最適化手法を定義
adam = Adam(lr=1e-5)
model_pupil_net.compile(optimizer=adam, loss='mean_squared_error')
# 構造・重みを保存するディレクトリーの有無を確認
dpath_checkpoints = os.path.join(dpath_this, dname_checkpoints)
if not os.path.isdir(dpath_checkpoints):
os.mkdir(dpath_checkpoints)
# 重みを保存するためのオブジェクトを用意
fname_weights = "model_weights_{epoch:02d}.h5"
fpath_weights = os.path.join(dpath_checkpoints, fname_weights)
checkpointer = ModelCheckpoint(filepath=fpath_weights, save_best_only=False)
# トレーニングを開始
print('start training...')
history = model_pupil_net.fit(X_train, center_train, batch_size=64, epochs=200, verbose=1,
shuffle=True, validation_data=(X_valid, center_valid), callbacks=[checkpointer])
# Save History
f = open(dname_checkpoints + '/' + fname_history,'wb')
pickle.dump(history.history,f)
f.close
#
# TEST MODE
#
mode = "TEST"
if mode == "TEST":
# Prediction (test) mode
# 学習済みの重みをロード
epoch = 200
fname_weights = 'model_weights_%02d.h5'%(epoch)
fpath_weights = os.path.join(dname_checkpoints, fname_weights)
model_pupil_net.load_weights(fpath_weights)
print('==> done')
# Read Test Data
fnames = load_fnames('data/list_test_01.txt')
[fpaths_xs_test,fpaths_ys_test] = make_fnames(fnames,'data/img','data/mask','OperatorA_')
X_test = load_imgs_asarray(fpaths_xs_test, grayscale=False, target_size=target_size,
dim_ordering=dim_ordering)
Y_test = load_imgs_asarray(fpaths_ys_test, grayscale=True, target_size=target_size,
dim_ordering=dim_ordering)
# Yを初期化
center_test = []
for i in range(Y_test.shape[0]):
center_test.append(get_center(Y_test[i,0,:,:]))
center_test = np.array(center_test)
# トレーニング時に計算した平均・標準偏差をロード
print('loading mean and standard deviation from ' + fname_stats + '...')
stats = np.load(dname_checkpoints + '/' + fname_stats)
mean = stats['mean']
std = stats['std']
print('==> mean: ' + str(mean))
print('==> std : ' + str(std))
for i in range(3):
X_test[:, i] = (X_test[:, i] - mean[i]) / std[i]
print('==> done')
# テストを開始
outputs = model_pupil_net.predict(X_test)
diff = outputs - center_test;
# print(outputs)
# print(center_test)
# print(diff)
#
# for i in range(diff.shape[0]):
# print(np.linalg.norm(diff[i,:]))
print('L2 norm av. = %f'%(np.sum(np.linalg.norm(diff,axis=1))/diff.shape[0]))
from PIL import Image
import matplotlib.pyplot as plt
n = 0
for i in range(len(fpaths_xs_test)):
# テスト画像
im1 = Image.open(fpaths_xs_test[i])
im1 = im1.resize(target_size)
# Show pupil prediction
print('(%f,%f) - (%f,%f)(gt): %f'%(outputs[i,0],outputs[i,1],
center_test[i,0],center_test[i,1],
np.linalg.norm(diff[i,:])))
plt.imshow(im1)
plt.plot(outputs[i,0], outputs[i,1],'x')
plt.plot(center_test[i,0], center_test[i,1], '*')
plt.show()
n = n + 1
#
# Show History
#
mode = 'SHOW_HISTORY'
if mode == "SHOW_HISTORY":
# load pickle
print(dname_checkpoints + '/' + fname_history)
history = pickle.load(open(dname_checkpoints + '/' + fname_history, 'rb'))
for k in history.keys():
plt.plot(history[k])
plt.title(k)
plt.show()